<?php
/**
 * author: NanQi
 * datetime: 2020/8/23 13:58
 */

namespace NanQi\Hope\Crontab\Batch;

use Hyperf\Crontab\PipeMessage;
use NanQi\Hope\Base\BaseCron;
use NanQi\Hope\Crontab\CrontabExecutor;
use NanQi\Hope\Helper;
use NanQi\Hope\Hope;
use Swoole\Coroutine\Channel;
use Swoole\Coroutine\WaitGroup;
use Swoole\Http\Server;

abstract class BaseBatchCron extends BaseCron implements BatchCronInterface, BatchPipeInterface
{
    use Helper;

    /**
     * @var Channel
     */
    public $idleWorkerIds;

    /**
     * @var WaitGroup
     */
    public $waitGroup;

    public $timeout = -1;

    public function getCrontabName()
    {
        $class = get_called_class();
        return str_replace('BatchCron', '', class_basename($class));
    }

    public function execute()
    {
        try {
            $redis = $this->getRedisBusiness();

            $workerNum = $this->getServer()->setting['worker_num'];
            $this->idleWorkerIds = new Channel($workerNum - 1);
            for ($i = 1; $i < $workerNum; $i++) {
                $this->idleWorkerIds->push($i);
            }

            $this->waitGroup = new WaitGroup();

            $list = $this->dispatch();

            foreach ($list as $item) {
                $workerId = $this->idleWorkerIds->pop();
                $this->waitGroup->add();

                if (CrontabExecutor::$running || $workerId == -1) {
                    $this->getServer()->sendMessage(new BatchPipeMessage(
                        $this->getBatchClass(), $item
                    ), $workerId);
                } else {
                    //批量任务中途结束，保存redis中
                    $this->getLog()->emergency("BATCHCRON:WAITING." . get_called_class() .
                        PHP_EOL . json_encode($item));
                    //SET类型，KEY: batchcron:waiting:{$class}
                    $redis->sAdd("batchcron:waiting:" . get_called_class(),
                        json_encode($item));

                    //跑完所有迭代器，然后标注状态，防止此刻再次回置
                    $this->idleWorkerIds->push(-1);
                }
            }

            $this->waitGroup->wait($this->timeout);
        } catch (\Exception $e) {
            $this->getLog()->error('BatchCronERROR:' . $e->getMessage()
                . PHP_EOL . $e->getTraceAsString());
        } catch (\Throwable $e) {
            $this->getLog()->error('BatchCronERROR:' . $e->getMessage()
                . PHP_EOL . $e->getTraceAsString());
        }
    }

    public function getBatchClass(): string
    {
        $class = get_called_class();

        $batchClass = substr($class, 0, strlen($class) - 4);
        if (class_exists($batchClass)) {
            return $batchClass;
        }
        $batchClass = str_replace('BatchCron', 'Batch', $class);
        if (class_exists($batchClass)) {
            return $batchClass;
        }

        throw new \RuntimeException('没有找到Batch类');
    }

    public function pipe($data, $fromWorkerId)
    {
        $this->idleWorkerIds->push($fromWorkerId);
        $this->waitGroup->done();
    }

    public static function run()
    {
        $server = Hope::getServer();
        if (!$server instanceof Server) {
            throw new \RuntimeException("异步运行必须使用多进程模型");
        }
        $workerId = $server->getWorkerId();

        /** @var BaseBatchCron $batchCron */
        $instance = di(static::class);
        if ($workerId != 0) {
            $server->sendMessage(new PipeMessage(
                'callback',
                [static::class, 'execute'],
                null
            ), 0);
        } else {
            $instance->execute();
        }
    }

    public static function runSync()
    {
        /** @var BaseBatchCron $batchCron */
        $batchCron = di(static::class);

        $list = $batchCron->dispatch();
        foreach ($list as $item) {
            /** @var BaseBatch $batch */
            $batch = di($batchCron->getBatchClass());

            $flg = $batch->execute($item);
            if (!$flg) {
                info($batchCron->getCrontabName().' return false');
            }
        }
    }
}